home *** CD-ROM | disk | FTP | other *** search
/ Freelog 125 / Freelog_MarsAvril2015_No125.iso / Musique / Quod Libet / quodlibet-3.3.0-installer.exe / bin / multiprocessing / pool.pyc (.txt) < prev    next >
Python Compiled Bytecode  |  2014-12-31  |  21KB  |  657 lines

  1. # Source Generated with Decompyle++
  2. # File: in.pyc (Python 2.7)
  3.  
  4. __all__ = [
  5.     'Pool']
  6. import threading
  7. import Queue
  8. import itertools
  9. import collections
  10. import time
  11. from multiprocessing import Process, cpu_count, TimeoutError
  12. from multiprocessing.util import Finalize, debug
  13. RUN = 0
  14. CLOSE = 1
  15. TERMINATE = 2
  16. job_counter = itertools.count()
  17.  
  18. def mapstar(args):
  19.     return map(*args)
  20.  
  21.  
  22. class MaybeEncodingError(Exception):
  23.     '''Wraps possible unpickleable errors, so they can be
  24.     safely sent through the socket.'''
  25.     
  26.     def __init__(self, exc, value):
  27.         self.exc = repr(exc)
  28.         self.value = repr(value)
  29.         super(MaybeEncodingError, self).__init__(self.exc, self.value)
  30.  
  31.     
  32.     def __str__(self):
  33.         return "Error sending result: '%s'. Reason: '%s'" % (self.value, self.exc)
  34.  
  35.     
  36.     def __repr__(self):
  37.         return '<MaybeEncodingError: %s>' % str(self)
  38.  
  39.  
  40.  
  41. def worker(inqueue, outqueue, initializer = None, initargs = (), maxtasks = None):
  42.     if not maxtasks is None:
  43.         if not type(maxtasks) == int or maxtasks > 0:
  44.             raise AssertionError
  45.         put = None.put
  46.         get = inqueue.get
  47.         if hasattr(inqueue, '_writer'):
  48.             inqueue._writer.close()
  49.             outqueue._reader.close()
  50.         if initializer is not None:
  51.             initializer(*initargs)
  52.         completed = 0
  53.         while (maxtasks is None or maxtasks) and completed < maxtasks:
  54.             
  55.             try:
  56.                 task = get()
  57.             except (EOFError, IOError):
  58.                 debug('worker got EOFError or IOError -- exiting')
  59.                 break
  60.  
  61.             if task is None:
  62.                 debug('worker got sentinel -- exiting')
  63.                 break
  64.             (job, i, func, args, kwds) = task
  65.             
  66.             try:
  67.                 result = (True, func(*args, **kwds))
  68.             except Exception:
  69.                 e = None
  70.                 result = (False, e)
  71.  
  72.             
  73.             try:
  74.                 put((job, i, result))
  75.             except Exception:
  76.                 e = None
  77.                 wrapped = MaybeEncodingError(e, result[1])
  78.                 debug('Possible encoding error while sending result: %s' % wrapped)
  79.                 put((job, i, (False, wrapped)))
  80.  
  81.             completed += 1
  82.         debug('worker exiting after %d tasks' % completed)
  83.         return None
  84.  
  85.  
  86. class Pool(object):
  87.     '''
  88.     Class which supports an async version of the `apply()` builtin
  89.     '''
  90.     Process = Process
  91.     
  92.     def __init__(self, processes = None, initializer = None, initargs = (), maxtasksperchild = None):
  93.         self._setup_queues()
  94.         self._taskqueue = Queue.Queue()
  95.         self._cache = { }
  96.         self._state = RUN
  97.         self._maxtasksperchild = maxtasksperchild
  98.         self._initializer = initializer
  99.         self._initargs = initargs
  100.         if processes is None:
  101.             
  102.             try:
  103.                 processes = cpu_count()
  104.             except NotImplementedError:
  105.                 processes = 1
  106.             
  107.  
  108.         if processes < 1:
  109.             raise ValueError('Number of processes must be at least 1')
  110.         if initializer is not None and not hasattr(initializer, '__call__'):
  111.             raise TypeError('initializer must be a callable')
  112.         self._processes = processes
  113.         self._pool = []
  114.         self._repopulate_pool()
  115.         self._worker_handler = threading.Thread(target = Pool._handle_workers, args = (self,))
  116.         self._worker_handler.daemon = True
  117.         self._worker_handler._state = RUN
  118.         self._worker_handler.start()
  119.         self._task_handler = threading.Thread(target = Pool._handle_tasks, args = (self._taskqueue, self._quick_put, self._outqueue, self._pool))
  120.         self._task_handler.daemon = True
  121.         self._task_handler._state = RUN
  122.         self._task_handler.start()
  123.         self._result_handler = threading.Thread(target = Pool._handle_results, args = (self._outqueue, self._quick_get, self._cache))
  124.         self._result_handler.daemon = True
  125.         self._result_handler._state = RUN
  126.         self._result_handler.start()
  127.         self._terminate = Finalize(self, self._terminate_pool, args = (self._taskqueue, self._inqueue, self._outqueue, self._pool, self._worker_handler, self._task_handler, self._result_handler, self._cache), exitpriority = 15)
  128.  
  129.     
  130.     def _join_exited_workers(self):
  131.         '''Cleanup after any worker processes which have exited due to reaching
  132.         their specified lifetime.  Returns True if any workers were cleaned up.
  133.         '''
  134.         cleaned = False
  135.         for i in reversed(range(len(self._pool))):
  136.             worker = self._pool[i]
  137.             if worker.exitcode is not None:
  138.                 debug('cleaning up worker %d' % i)
  139.                 worker.join()
  140.                 cleaned = True
  141.                 del self._pool[i]
  142.                 continue
  143.         return cleaned
  144.  
  145.     
  146.     def _repopulate_pool(self):
  147.         '''Bring the number of pool processes up to the specified number,
  148.         for use after reaping workers which have exited.
  149.         '''
  150.         for i in range(self._processes - len(self._pool)):
  151.             w = self.Process(target = worker, args = (self._inqueue, self._outqueue, self._initializer, self._initargs, self._maxtasksperchild))
  152.             self._pool.append(w)
  153.             w.name = w.name.replace('Process', 'PoolWorker')
  154.             w.daemon = True
  155.             w.start()
  156.             debug('added worker')
  157.         
  158.  
  159.     
  160.     def _maintain_pool(self):
  161.         '''Clean up any exited workers and start replacements for them.
  162.         '''
  163.         if self._join_exited_workers():
  164.             self._repopulate_pool()
  165.  
  166.     
  167.     def _setup_queues(self):
  168.         SimpleQueue = SimpleQueue
  169.         import queues
  170.         self._inqueue = SimpleQueue()
  171.         self._outqueue = SimpleQueue()
  172.         self._quick_put = self._inqueue._writer.send
  173.         self._quick_get = self._outqueue._reader.recv
  174.  
  175.     
  176.     def apply(self, func, args = (), kwds = { }):
  177.         '''
  178.         Equivalent of `apply()` builtin
  179.         '''
  180.         if not self._state == RUN:
  181.             raise AssertionError
  182.         return None.apply_async(func, args, kwds).get()
  183.  
  184.     
  185.     def map(self, func, iterable, chunksize = None):
  186.         '''
  187.         Equivalent of `map()` builtin
  188.         '''
  189.         if not self._state == RUN:
  190.             raise AssertionError
  191.         return None.map_async(func, iterable, chunksize).get()
  192.  
  193.     
  194.     def imap(self, func, iterable, chunksize = 1):
  195.         '''
  196.         Equivalent of `itertools.imap()` -- can be MUCH slower than `Pool.map()`
  197.         '''
  198.         if not self._state == RUN:
  199.             raise AssertionError
  200.         if None == 1:
  201.             result = IMapIterator(self._cache)
  202.             (None, self._taskqueue.put)(((lambda .0: pass)(enumerate(iterable)), result._set_length))
  203.             return result
  204.         if not None > 1:
  205.             raise AssertionError
  206.         task_batches = None._get_tasks(func, iterable, chunksize)
  207.         result = IMapIterator(self._cache)
  208.         (self._taskqueue.put,)(((lambda .0: pass)(enumerate(task_batches)), result._set_length))
  209.         return (lambda .0: pass)(result)
  210.  
  211.     
  212.     def imap_unordered(self, func, iterable, chunksize = 1):
  213.         '''
  214.         Like `imap()` method but ordering of results is arbitrary
  215.         '''
  216.         if not self._state == RUN:
  217.             raise AssertionError
  218.         if None == 1:
  219.             result = IMapUnorderedIterator(self._cache)
  220.             (None, self._taskqueue.put)(((lambda .0: pass)(enumerate(iterable)), result._set_length))
  221.             return result
  222.         if not None > 1:
  223.             raise AssertionError
  224.         task_batches = None._get_tasks(func, iterable, chunksize)
  225.         result = IMapUnorderedIterator(self._cache)
  226.         (self._taskqueue.put,)(((lambda .0: pass)(enumerate(task_batches)), result._set_length))
  227.         return (lambda .0: pass)(result)
  228.  
  229.     
  230.     def apply_async(self, func, args = (), kwds = { }, callback = None):
  231.         '''
  232.         Asynchronous equivalent of `apply()` builtin
  233.         '''
  234.         if not self._state == RUN:
  235.             raise AssertionError
  236.         result = None(self._cache, callback)
  237.         self._taskqueue.put(([
  238.             (result._job, None, func, args, kwds)], None))
  239.         return result
  240.  
  241.     
  242.     def map_async(self, func, iterable, chunksize = None, callback = None):
  243.         '''
  244.         Asynchronous equivalent of `map()` builtin
  245.         '''
  246.         if not self._state == RUN:
  247.             raise AssertionError
  248.         if not None(iterable, '__len__'):
  249.             iterable = list(iterable)
  250.         if chunksize is None:
  251.             (chunksize, extra) = divmod(len(iterable), len(self._pool) * 4)
  252.             if extra:
  253.                 chunksize += 1
  254.             
  255.         if len(iterable) == 0:
  256.             chunksize = 0
  257.         task_batches = Pool._get_tasks(func, iterable, chunksize)
  258.         result = MapResult(self._cache, chunksize, len(iterable), callback)
  259.         (self._taskqueue.put,)(((lambda .0: pass)(enumerate(task_batches)), None))
  260.         return result
  261.  
  262.     
  263.     def _handle_workers(pool):
  264.         thread = threading.current_thread()
  265.         while (thread._state == RUN or pool._cache) and thread._state != TERMINATE:
  266.             pool._maintain_pool()
  267.             time.sleep(0.1)
  268.         pool._taskqueue.put(None)
  269.         debug('worker handler exiting')
  270.  
  271.     _handle_workers = staticmethod(_handle_workers)
  272.     
  273.     def _handle_tasks(taskqueue, put, outqueue, pool):
  274.         thread = threading.current_thread()
  275.         for taskseq, set_length in iter(taskqueue.get, None):
  276.             i = -1
  277.             for i, task in enumerate(taskseq):
  278.                 if thread._state:
  279.                     debug('task handler found thread._state != RUN')
  280.                     break
  281.                 
  282.                 try:
  283.                     put(task)
  284.                 continue
  285.                 except IOError:
  286.                     debug('could not put task on queue')
  287.                     break
  288.                     continue
  289.                 
  290.  
  291.             elif set_length:
  292.                 debug('doing set_length()')
  293.                 set_length(i + 1)
  294.                 continue
  295.                 continue
  296.                 break
  297.             debug('task handler got sentinel')
  298.             
  299.             try:
  300.                 debug('task handler sending sentinel to result handler')
  301.                 outqueue.put(None)
  302.                 debug('task handler sending sentinel to workers')
  303.                 for p in pool:
  304.                     put(None)
  305.             except IOError:
  306.                 debug('task handler got IOError when sending sentinels')
  307.  
  308.             debug('task handler exiting')
  309.             return None
  310.  
  311.     _handle_tasks = staticmethod(_handle_tasks)
  312.     
  313.     def _handle_results(outqueue, get, cache):
  314.         thread = threading.current_thread()
  315.         while None:
  316.             
  317.             try:
  318.                 task = get()
  319.             except (IOError, EOFError):
  320.                 debug('result handler got EOFError/IOError -- exiting')
  321.                 return None
  322.  
  323.             if thread._state:
  324.                 if not thread._state == TERMINATE:
  325.                     raise AssertionError
  326.                 None('result handler found thread._state=TERMINATE')
  327.                 break
  328.             if task is None:
  329.                 debug('result handler got sentinel')
  330.                 break
  331.             (job, i, obj) = task
  332.             
  333.             try:
  334.                 cache[job]._set(i, obj)
  335.             continue
  336.             except KeyError:
  337.                 continue
  338.             
  339.  
  340.             while cache and thread._state != TERMINATE:
  341.                 
  342.                 try:
  343.                     task = get()
  344.                 except (IOError, EOFError):
  345.                     debug('result handler got EOFError/IOError -- exiting')
  346.                     return None
  347.  
  348.                 if task is None:
  349.                     debug('result handler ignoring extra sentinel')
  350.                     continue
  351.                 (job, i, obj) = task
  352.                 
  353.                 try:
  354.                     cache[job]._set(i, obj)
  355.                 continue
  356.                 except KeyError:
  357.                     continue
  358.                 
  359.  
  360.             if hasattr(outqueue, '_reader'):
  361.                 debug('ensuring that outqueue is not full')
  362.                 
  363.                 try:
  364.                     for i in range(10):
  365.                         if not outqueue._reader.poll():
  366.                             break
  367.                         get()
  368.                 except (IOError, EOFError):
  369.                     pass
  370.                 
  371.  
  372.         debug('result handler exiting: len(cache)=%s, thread._state=%s', len(cache), thread._state)
  373.  
  374.     _handle_results = staticmethod(_handle_results)
  375.     
  376.     def _get_tasks(func, it, size):
  377.         it = iter(it)
  378.         while None:
  379.             x = tuple(itertools.islice(it, size))
  380.             if not x:
  381.                 return None
  382.             yield (None, x)
  383.             continue
  384.             return None
  385.  
  386.     _get_tasks = staticmethod(_get_tasks)
  387.     
  388.     def __reduce__(self):
  389.         raise NotImplementedError('pool objects cannot be passed between processes or pickled')
  390.  
  391.     
  392.     def close(self):
  393.         debug('closing pool')
  394.         if self._state == RUN:
  395.             self._state = CLOSE
  396.             self._worker_handler._state = CLOSE
  397.  
  398.     
  399.     def terminate(self):
  400.         debug('terminating pool')
  401.         self._state = TERMINATE
  402.         self._worker_handler._state = TERMINATE
  403.         self._terminate()
  404.  
  405.     
  406.     def join(self):
  407.         debug('joining pool')
  408.         if not self._state in (CLOSE, TERMINATE):
  409.             raise AssertionError
  410.         None._worker_handler.join()
  411.         self._task_handler.join()
  412.         self._result_handler.join()
  413.         for p in self._pool:
  414.             p.join()
  415.         
  416.  
  417.     
  418.     def _help_stuff_finish(inqueue, task_handler, size):
  419.         debug('removing tasks from inqueue until task handler finished')
  420.         inqueue._rlock.acquire()
  421.         while task_handler.is_alive() and inqueue._reader.poll():
  422.             inqueue._reader.recv()
  423.             time.sleep(0)
  424.  
  425.     _help_stuff_finish = staticmethod(_help_stuff_finish)
  426.     
  427.     def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, worker_handler, task_handler, result_handler, cache):
  428.         debug('finalizing pool')
  429.         worker_handler._state = TERMINATE
  430.         task_handler._state = TERMINATE
  431.         debug('helping task handler/workers to finish')
  432.         cls._help_stuff_finish(inqueue, task_handler, len(pool))
  433.         if not result_handler.is_alive() and len(cache) == 0:
  434.             raise AssertionError
  435.         result_handler._state = None
  436.         outqueue.put(None)
  437.         debug('joining worker handler')
  438.         if threading.current_thread() is not worker_handler:
  439.             worker_handler.join(1e+100)
  440.         if pool and hasattr(pool[0], 'terminate'):
  441.             debug('terminating workers')
  442.             for p in pool:
  443.                 if p.exitcode is None:
  444.                     p.terminate()
  445.                     continue
  446.         debug('joining task handler')
  447.         if threading.current_thread() is not task_handler:
  448.             task_handler.join(1e+100)
  449.         debug('joining result handler')
  450.         if threading.current_thread() is not result_handler:
  451.             result_handler.join(1e+100)
  452.         if pool and hasattr(pool[0], 'terminate'):
  453.             debug('joining pool workers')
  454.             for p in pool:
  455.                 if p.is_alive():
  456.                     debug('cleaning up worker %d' % p.pid)
  457.                     p.join()
  458.                     continue
  459.  
  460.     _terminate_pool = classmethod(_terminate_pool)
  461.  
  462.  
  463. class ApplyResult(object):
  464.     
  465.     def __init__(self, cache, callback):
  466.         self._cond = threading.Condition(threading.Lock())
  467.         self._job = job_counter.next()
  468.         self._cache = cache
  469.         self._ready = False
  470.         self._callback = callback
  471.         cache[self._job] = self
  472.  
  473.     
  474.     def ready(self):
  475.         return self._ready
  476.  
  477.     
  478.     def successful(self):
  479.         if not self._ready:
  480.             raise AssertionError
  481.         return None._success
  482.  
  483.     
  484.     def wait(self, timeout = None):
  485.         self._cond.acquire()
  486.         
  487.         try:
  488.             if not self._ready:
  489.                 self._cond.wait(timeout)
  490.         finally:
  491.             self._cond.release()
  492.  
  493.  
  494.     
  495.     def get(self, timeout = None):
  496.         self.wait(timeout)
  497.         if not self._ready:
  498.             raise TimeoutError
  499.         if self._success:
  500.             return self._value
  501.         raise None._value
  502.  
  503.     
  504.     def _set(self, i, obj):
  505.         (self._success, self._value) = obj
  506.         if self._callback and self._success:
  507.             self._callback(self._value)
  508.         self._cond.acquire()
  509.         
  510.         try:
  511.             self._ready = True
  512.             self._cond.notify()
  513.         finally:
  514.             self._cond.release()
  515.  
  516.         del self._cache[self._job]
  517.  
  518.  
  519. AsyncResult = ApplyResult
  520.  
  521. class MapResult(ApplyResult):
  522.     
  523.     def __init__(self, cache, chunksize, length, callback):
  524.         ApplyResult.__init__(self, cache, callback)
  525.         self._success = True
  526.         self._value = [
  527.             None] * length
  528.         self._chunksize = chunksize
  529.         if chunksize <= 0:
  530.             self._number_left = 0
  531.             self._ready = True
  532.             del cache[self._job]
  533.         else:
  534.             self._number_left = length // chunksize + bool(length % chunksize)
  535.  
  536.     
  537.     def _set(self, i, success_result):
  538.         (success, result) = success_result
  539.  
  540.  
  541.  
  542. class IMapIterator(object):
  543.     
  544.     def __init__(self, cache):
  545.         self._cond = threading.Condition(threading.Lock())
  546.         self._job = job_counter.next()
  547.         self._cache = cache
  548.         self._items = collections.deque()
  549.         self._index = 0
  550.         self._length = None
  551.         self._unsorted = { }
  552.         cache[self._job] = self
  553.  
  554.     
  555.     def __iter__(self):
  556.         return self
  557.  
  558.     
  559.     def next(self, timeout = None):
  560.         self._cond.acquire()
  561.         
  562.         try:
  563.             item = self._items.popleft()
  564.         except IndexError:
  565.             if self._index == self._length:
  566.                 raise StopIteration
  567.             self._cond.wait(timeout)
  568.             
  569.             try:
  570.                 item = self._items.popleft()
  571.             except IndexError:
  572.                 if self._index == self._length:
  573.                     raise StopIteration
  574.                 raise TimeoutError
  575.             
  576.  
  577.         finally:
  578.             self._cond.release()
  579.  
  580.         (success, value) = item
  581.         if success:
  582.             return value
  583.         raise None
  584.  
  585.     __next__ = next
  586.     
  587.     def _set(self, i, obj):
  588.         self._cond.acquire()
  589.         
  590.         try:
  591.             if self._index == self._length:
  592.                 del self._cache[self._job]
  593.         finally:
  594.             self._cond.release()
  595.  
  596.  
  597.     
  598.     def _set_length(self, length):
  599.         self._cond.acquire()
  600.         
  601.         try:
  602.             self._length = length
  603.             if self._index == self._length:
  604.                 self._cond.notify()
  605.                 del self._cache[self._job]
  606.         finally:
  607.             self._cond.release()
  608.  
  609.  
  610.  
  611.  
  612. class IMapUnorderedIterator(IMapIterator):
  613.     
  614.     def _set(self, i, obj):
  615.         self._cond.acquire()
  616.         
  617.         try:
  618.             self._items.append(obj)
  619.             self._index += 1
  620.             self._cond.notify()
  621.             if self._index == self._length:
  622.                 del self._cache[self._job]
  623.         finally:
  624.             self._cond.release()
  625.  
  626.  
  627.  
  628.  
  629. class ThreadPool(Pool):
  630.     from dummy import Process
  631.     
  632.     def __init__(self, processes = None, initializer = None, initargs = ()):
  633.         Pool.__init__(self, processes, initializer, initargs)
  634.  
  635.     
  636.     def _setup_queues(self):
  637.         self._inqueue = Queue.Queue()
  638.         self._outqueue = Queue.Queue()
  639.         self._quick_put = self._inqueue.put
  640.         self._quick_get = self._outqueue.get
  641.  
  642.     
  643.     def _help_stuff_finish(inqueue, task_handler, size):
  644.         inqueue.not_empty.acquire()
  645.         
  646.         try:
  647.             inqueue.queue.clear()
  648.             inqueue.queue.extend([
  649.                 None] * size)
  650.             inqueue.not_empty.notify_all()
  651.         finally:
  652.             inqueue.not_empty.release()
  653.  
  654.  
  655.     _help_stuff_finish = staticmethod(_help_stuff_finish)
  656.  
  657.